AWS SDK for Go を使ってS3へストリーミングアップロードを行う
Gopherってビーバーなんですかね?
はじめに
最近「Go言語による並行処理」を読んでGoルーティンとチャネルを組み合わせたプラグラムを試してみたかったので練習がてらS3へのストリーミングアップロードを行うサンプルを書いてみました。
やること
このプログラムでやりたいことは以下の通りです。
- データソースからデータを読み込んでinputチャネルに送信する
- inputチャネルからデータを読み込んでS3へストリーミングアップロードする
- アップロード先のキーはファイルが特定のサイズになるごとに切り替える(チャンク化する)
作戦
チャンク化を以下のように行います。
- ストリーミングアップロードを行うためにio.Pipeを使ってinputチャネルから読んだデータをSDKのmanager.Uploaderにフィードする(io.Readerを渡せる)
- Uploaderにフィードしたデータ件数を数えておいて一定数に達したらパイプをクローズしアップロードを完了する
- inputチャネルがクローズされるまで上記を繰り返し行う
コード
package infra import ( "context" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" "go.uber.org/zap" "io" "sync" ) type Uploader struct { Bucket string Prefix string logger *zap.Logger wg *sync.WaitGroup } func NewUpLoader(bucket string, prefix string, logger *zap.Logger, wg *sync.WaitGroup) Uploader { return Uploader{ Bucket: bucket, Prefix: prefix, logger: logger, wg: wg, } } // ChunkedUpload チャンクサイズを指定してinputの内容をS3へアップロードする. func (up *Uploader) ChunkedUpload(ctx context.Context, size int, input chan []byte) { go up.upload(ctx, input, size) } func (up *Uploader) upload(ctx context.Context, input chan []byte, size int) { up.wg.Add(1) defer up.wg.Done() for { //チャンネルクローズ検出用のcontext inputChannelContext, cancel := context.WithCancel(ctx) func(cancel context.CancelFunc) { r, w := io.Pipe() defer w.Close() up.startS3Upload(r) var cnt = 0 for { select { case <-ctx.Done(): return case in, ok := <-input: if ok { //チャンクサイズをインクリメント cnt++ w.Write(in) } else { //クローズされたらコンテキストに通知 cancel() } //チャンクがいっぱいになったらクローズする if cnt == size || !ok { return } } } }(cancel) //呼び出し側からのキャンセル or チャネルクローズで終了 select { case <-ctx.Done(): return case <-inputChannelContext.Done(): return default: } } } func (up *Uploader) startS3Upload(r *io.PipeReader) { up.wg.Add(1) go func() { defer up.wg.Done() key := up.Prefix + uuid.NewString() ctx := context.Background() req := s3.PutObjectInput{ Bucket: &up.Bucket, Key: &key, Body: r, } logger := up.logger.With(zap.String("key", key)) logger.Debug("start S3 uploading") if _, err := newS3Uploader(ctx).Upload(ctx, &req); err == nil { logger.Info("upload done") } else { logger.Error(err.Error()) } }() } func newS3Uploader(ctx context.Context) *manager.Uploader { // localstackを使うためにエンドポイント設定を上書き resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ URL: "http://localhost:4567", HostnameImmutable: true, }, nil }) cfg, _ := config.LoadDefaultConfig(ctx, config.WithEndpointResolverWithOptions(resolver)) client := s3.NewFromConfig(cfg) upload := manager.NewUploader(client) return upload }
実行してみる
これに次のようなデータソースを接続して試してみます。
package infra import ( "context" "fmt" "go.uber.org/zap" "sync" "time" ) type Source struct { logger *zap.Logger wg *sync.WaitGroup } func NewSource(logger *zap.Logger, wg *sync.WaitGroup) Source { return Source{ logger: logger, wg: wg, } } func (source *Source) Feed(ctx context.Context, cnt int) chan []byte { output := make(chan []byte) source.wg.Add(1) go func() { defer close(output) defer source.wg.Done() for i := 0; i < cnt; i++ { select { case <-ctx.Done(): return case output <- []byte(fmt.Sprintf("%d\n", i)): } time.Sleep(time.Millisecond * 5) } }() return output }
適当なサイズのデータを入力して実行してみます。
func main() { ctx, cancel := context.WithCancel(context.Background()) logger, _ := zap.NewDevelopment() wg := sync.WaitGroup{} defer cancel() upload := infra.NewUpLoader( "bucket", "prefix-", logger, &wg, ) source := infra.NewSource(logger, &wg) in := source.Feed(ctx, 5000) upload.ChunkedUpload(ctx, 1000, in) wg.Wait() }
出力はこんな感じです。
2022-02-22T15:46:00.259+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-13fea82d-bcb0-4fab-83be-d9a67576560e"} 2022-02-22T15:46:06.202+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-db0e6579-df9e-4f3d-8e10-177d3a02a71a"} 2022-02-22T15:46:11.220+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-13fea82d-bcb0-4fab-83be-d9a67576560e"} 2022-02-22T15:46:12.114+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-a8c17f0f-04d8-4fdf-91f9-1d8c5f6ee7ac"} 2022-02-22T15:46:17.144+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-db0e6579-df9e-4f3d-8e10-177d3a02a71a"} 2022-02-22T15:46:18.121+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-c607370b-947d-47a6-8067-7b9a36333e1f"} 2022-02-22T15:46:23.138+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-a8c17f0f-04d8-4fdf-91f9-1d8c5f6ee7ac"} 2022-02-22T15:46:24.105+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-b7fc80e6-56d0-49e5-b52f-554399de83ec"} 2022-02-22T15:46:29.130+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-c607370b-947d-47a6-8067-7b9a36333e1f"} 2022-02-22T15:46:30.099+0900 DEBUG infra/s3uploader.go:90 start S3 uploading {"key": "prefix-6f2bfd8c-cddb-4667-84af-a8e2df3999a7"} 2022-02-22T15:46:35.143+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-b7fc80e6-56d0-49e5-b52f-554399de83ec"} 2022-02-22T15:46:35.144+0900 INFO infra/s3uploader.go:92 upload done {"key": "prefix-6f2bfd8c-cddb-4667-84af-a8e2df3999a7"}
実際にS3上に作成されるファイルは以下のようになります。サイズが異なるのはデータとして連番の数値を使っていてその桁数が異なるためです。(チャンク数とファイル数が空ファイルの設定次第で空ファイルができてしまうのも気になります)
awslocal --endpoint-url=http://localhost:4567 s3 ls s3://bucket/ 2022-02-22 15:46:11 3890 prefix-13fea82d-bcb0-4fab-83be-d9a67576560e 2022-02-22 15:46:35 0 prefix-6f2bfd8c-cddb-4667-84af-a8e2df3999a7 2022-02-22 15:46:23 5000 prefix-a8c17f0f-04d8-4fdf-91f9-1d8c5f6ee7ac 2022-02-22 15:46:35 5000 prefix-b7fc80e6-56d0-49e5-b52f-554399de83ec 2022-02-22 15:46:29 5000 prefix-c607370b-947d-47a6-8067-7b9a36333e1f 2022-02-22 15:46:17 5000 prefix-db0e6579-df9e-4f3d-8e10-177d3a02a71a
まとめ
今更ですが日々の暮らしのちょっとした平行処理にGoルーティンは便利だなと思いました。